package com.sg.common.utils;

import java.lang.reflect.InvocationTargetException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.log4j.Logger;
import org.hibernate.criterion.DetachedCriteria;

import com.sg.common.dao.BaseDao;
import com.sg.common.db.DbUtil;

public class SyncUtil {
	private static Logger log = Logger.getLogger(SyncUtil.class);
	


	public static void syncOneWayTo(String ds, String oranConStr, String oranSelect,
			final String cusSelect, final String cusUpdate,
			final String cusInsert) throws SQLException, InterruptedException {
		final Connection cusCon = DbUtil.cusCon(ds);
		Connection oranCon = DbUtil.cusCon(oranConStr);
		PreparedStatement ps = null;
		ResultSet rs = null;
		try {
			ps = oranCon.prepareStatement(oranSelect);
			rs = ps.executeQuery();
			ResultSetMetaData metaData = rs.getMetaData();
			final int columnCount = metaData.getColumnCount();
			List<String[]> nsl = new ArrayList<String[]>();
			while (rs.next()) {
				int i = 0;
				String[] ns = new String[columnCount];
				while (i < columnCount) {
					ns[i] = rs.getString(++i);
				}
				nsl.add(ns);
			}
			final CountDownLatch count = new CountDownLatch(nsl.size());
			final ExecutorService executorService = Executors
					.newFixedThreadPool(50);
			for (String[] nsx : nsl) {
				final String[] ns = nsx;
				executorService.submit(new Runnable() {
					public void run() {
						PreparedStatement ps = null;
						ResultSet rs = null;
						PreparedStatement ups = null;
						PreparedStatement lps = null;
						try {
							ps = cusCon.prepareStatement(cusSelect);
							ps.setString(1, ns[0]);
							rs = ps.executeQuery();
							if (rs.next()) {
								// 包含则更新
								ups = cusCon.prepareStatement(cusUpdate);
								int j = 1;
								while (j < columnCount) {
									ups.setString(j, ns[j++]);
								}
								ups.setString(j, ns[0]);
								ups.executeUpdate();
							} else {
								lps = cusCon.prepareStatement(cusInsert);
								int k = 1;
								while (k <= columnCount) {
									lps.setString(k, ns[k - 1]);
									k++;
								}
								lps.executeUpdate();
							}
						} catch (Exception e) {
							e.printStackTrace();
						} finally {
							// 运动员到达终点,count数减一
							count.countDown();
							try {
								if (ps != null) {
									ps.close();
								}
								if (rs != null) {
									rs.close();
								}
								if (ups != null) {
									ups.close();
								}
								if (lps != null) {
									lps.close();
								}
							} catch (SQLException e) {
								// TODO Auto-generated catch block
								e.printStackTrace();
							}
						}
					}
				});
			}
			long s = System.currentTimeMillis();
			log.info("同步信息, from " + s);

			count.await();
			long e = System.currentTimeMillis();
			log.info("同步信息, lasts " + (e - s));
			executorService.shutdown();
		} catch (SQLException e) {
			e.printStackTrace();
		} finally {
			rs.close();
			ps.close();
			cusCon.close();
			oranCon.close();
		}
	}
	/**
	 * oran ----sync----> cus
	 * 
	 * 
	 * 
	 * @param baseDao
	 * @param clazz     前面两个参数用于获取cus现有数据
	 * @param oranConStr 源数据库
	 * @param cusConStr 对象数据库
	 * @param selSql 从oran删选需要同步过来的sql语句
	 * @param insertSql 在cus上新增sql
	 * @param updateSql 在cus上更新sql
	 * @param comparePropertyName oran中比较基准属性
	 * @param insertType newIndentity--新增时赋予32位id  else--新增时按select出来的导入
	 * 
	 * @throws IllegalAccessException
	 * @throws InvocationTargetException
	 * @throws NoSuchMethodException
	 * @throws SQLException
	 * @throws InterruptedException
	 */
	public static void syncOneWay(BaseDao baseDao, String oranConStr, String cusConStr, String selSql,
			Class<?> clazz, final String insertSql, final String updateSql,
			final String comparePropertyName,final String insertType) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException, SQLException, InterruptedException{
		final int compareOranLocate = 0;// 默认memberSelSql的第一个为被比较对象。
		DetachedCriteria dc = DetachedCriteria.forClass(clazz);
		final List<Object> olist = baseDao.find(dc);
		Map<String,Object> omap = new HashMap<String, Object>();
		for(Object o:olist){
			omap.put(ReUtils.getStr(o, comparePropertyName), o);
		}
		final Map<String,Object> om =omap;
		Connection oranCon = DbUtil.cusCon(oranConStr);
		final Connection cusCon = DbUtil.cusCon(cusConStr);
		PreparedStatement ps = null;
		ResultSet rs = null;
		try {
			ps = oranCon.prepareStatement(selSql);
			rs = ps.executeQuery();
			ResultSetMetaData metaData = rs.getMetaData();
			final int columnCount = metaData.getColumnCount();
			List<String[]> nsl = new ArrayList<String[]>();
			while (rs.next()) {
				int i = 0;
				String[] ns = new String[columnCount];
				while (i < columnCount) {
					ns[i] = rs.getString(++i);
				}
				nsl.add(ns);
			}
			final CountDownLatch count = new CountDownLatch(nsl.size());
			final ExecutorService executorService = Executors
					.newFixedThreadPool(50);
			for (String[] nsx : nsl) {
				final String[] ns = nsx;
				executorService.submit(new Runnable() {
					public void run() {
						PreparedStatement ups = null;
						PreparedStatement lps = null;
						try {
							Object contain = om.get(ns[compareOranLocate]);
							if (contain != null) {
								// 包含则更新
								ups = cusCon.prepareStatement(updateSql);
								int j = 1;
								while (j < columnCount) {
									ups.setString(j, ns[j++]);
								}
								ups.setString(j, ns[compareOranLocate]);
								ups.executeUpdate();
							} else {
								lps = cusCon.prepareStatement(insertSql);
								int k = 1;
								/**
								 * ------------------------------------------------------------------------
								 * ----------------- 判断基准 更新。。。
								 * ------------------ - α β γ δ update情况下,update语句写成update xx set β=?... where α=？
								 * -------------------- ↑ ↑ ↑ ↑
								 * -------------------------------------------------------- 
								 * -------------------- A B C D  从selSql获取的原始数据 -------
								 * ------------------------------------------------- -
								 * ---------------------↓ ↓ ↓ ↓ -------
								 * ----------------- id α β γ δ insert情况下,insert语句写成insert into xx (id,α，β...) values (?,?...) 
								 * -----------------32位  赋值。。。。。。。
								 * ----------------------------------------------------------------
								 */
								if("newIndentity".equals(insertType)){
									lps.setString(k++, new UUIDGenerator()
											.generate().toString());
									while (k <= columnCount + 1) {
										lps.setString(k, ns[k - 2]);
										k++;
									}
								}
								/**
								 * ------------------------------------------------------------------------
								 * ----------------- 判断基准 更新。。。
								 * ------------------ - α β γ δ update情况下,update语句写成update xx set β=?... where α=？
								 * -------------------- ↑ ↑ ↑ ↑
								 * -------------------------------------------------------- 
								 * -------------------- A B C D  从selSql获取的原始数据 -------
								 * ------------------------------------------------- -
								 * ---------------------↓ ↓ ↓ ↓ -------
								 * ----------------- -- α β γ δ insert情况下,insert语句写成insert into xx (α，β...) values (?,?...) 
								 * -------------------- 赋值。。。。。。。
								 * ----------------------------------------------------------------
								 */
								else{
									while (k <= columnCount) {
										lps.setString(k, ns[k - 1]);
										k++;
									}
								}
								lps.executeUpdate();
							}
						} catch (Exception e) {
							e.printStackTrace();
						} finally {
							// 运动员到达终点,count数减一
							count.countDown();
							try {
								if (ups != null) {
									ups.close();
								}
								if (lps != null) {
									lps.close();
								}
							} catch (SQLException e) {
								// TODO Auto-generated catch block
								e.printStackTrace();
							}
						}
					}
				});
			}
			long s = System.currentTimeMillis();
			log.info("同步{" + clazz.getName() + "}信息, from " + s);

			count.await();
			long e = System.currentTimeMillis();
			log.info("同步{" + clazz.getName() + "}信息, lasts " + (e - s));
			executorService.shutdown();

		} catch (SQLException e) {
			e.printStackTrace();
		} finally {
			rs.close();
			ps.close();
			cusCon.close();
			oranCon.close();
		}
	}
	

	/**
	 * ------------------------------------------------------------------------
	 * ----------------- 判断基准 更新。。。
	 * ------------------ - α β γ δ update情况下,update语句写成update xx set β=?... where α=？
	 * -------------------- ↑ ↑ ↑ ↑
	 * -------------------------------------------------------- 
	 * -------------------- A B C D  从selSql获取的原始数据 -------
	 * ------------------------------------------------- -
	 * ---------------------↓ ↓ ↓ ↓ -------
	 * ----------------- id α β γ δ insert情况下,insert语句写成insert into xx (id,α，β...) values (?,?...) 
	 * -----------------32位  赋值。。。。。。。
	 * ----------------------------------------------------------------
	 * 
	 * @param baseDao
	 * @param cusConStr
	 *            连接需同步数据源
	 * @param selSql
	 * @param insertSql
	 * @param updateSql
	 * @param comparePropertyName
	 * @throws SQLException
	 * @throws IllegalAccessException
	 * @throws InvocationTargetException
	 * @throws NoSuchMethodException
	 * @throws InterruptedException 
	 */
	@Deprecated
	public static void sync(BaseDao baseDao, String cusConStr, String selSql,
			Class<?> clazz, final String insertSql, final String updateSql,
			final String comparePropertyName) throws SQLException,
			IllegalAccessException, InvocationTargetException,
			NoSuchMethodException, InterruptedException {
		final int compareOranLocate = 0;// 默认memberSelSql的第一个为被比较对象。
		DetachedCriteria dc = DetachedCriteria.forClass(clazz);
		final List<Object> olist = baseDao.find(dc);
		Map<String,Object> omap = new HashMap<String, Object>();
		for(Object o:olist){
			omap.put(ReUtils.getStr(o, comparePropertyName), o);
		}
		final Map<String,Object> om =omap;
		Connection mdmcon = null;
		final Connection cusCon = DbUtil.cusCon(cusConStr);
		PreparedStatement ps = null;
		ResultSet rs = null;
		try {
			mdmcon = DbUtil.mdmCon();
			ps = mdmcon.prepareStatement(selSql);
			rs = ps.executeQuery();
			ResultSetMetaData metaData = rs.getMetaData();
			final int columnCount = metaData.getColumnCount();
			List<String[]> nsl = new ArrayList<String[]>();
			while (rs.next()) {
				int i = 0;
				String[] ns = new String[columnCount];
				while (i < columnCount) {
					ns[i] = rs.getString(++i);
				}
				nsl.add(ns);
			}
			final CountDownLatch count = new CountDownLatch(nsl.size());
			final ExecutorService executorService = Executors
					.newFixedThreadPool(50);
			for (String[] nsx : nsl) {
				final String[] ns = nsx;
				executorService.submit(new Runnable() {
					public void run() {
						PreparedStatement ups = null;
						PreparedStatement lps = null;
						try {
							Object contain = om.get(ns[compareOranLocate]);
//							Object contain = CompareUtil.containObject(olist,
//									comparePropertyName, ns[compareOranLocate]);
							if (contain != null) {
								// 包含则更新
								ups = cusCon.prepareStatement(updateSql);
								int j = 1;
								while (j < columnCount) {
									ups.setString(j, ns[j++]);
								}
								ups.setString(j, ns[compareOranLocate]);
								ups.executeUpdate();
							} else {
								lps = cusCon.prepareStatement(insertSql);
								int k = 1;
								lps.setString(k++, new UUIDGenerator()
										.generate().toString());
								while (k <= columnCount + 1) {
									lps.setString(k, ns[k - 2]);
									k++;
								}
								lps.executeUpdate();
							}
						} catch (Exception e) {
							e.printStackTrace();
						} finally {
							// 运动员到达终点,count数减一
							count.countDown();
							try {
								if (ups != null) {
									ups.close();
								}
								if (lps != null) {
									lps.close();
								}
							} catch (SQLException e) {
								// TODO Auto-generated catch block
								e.printStackTrace();
							}
						}
					}
				});
			}
			long s = System.currentTimeMillis();
			log.info("同步{" + clazz.getName() + "}信息, from " + s);

			count.await();
			long e = System.currentTimeMillis();
			log.info("同步{" + clazz.getName() + "}信息, lasts " + (e - s));
			executorService.shutdown();

		} catch (SQLException e) {
			e.printStackTrace();
		} finally {
			rs.close();
			ps.close();
			cusCon.close();
			mdmcon.close();
		}
	}

}
